package com.example.demo.stream;

import com.example.demo.entity.Event;
import com.example.demo.mapper.EventMapper;
import com.example.demo.stream.sink.MySQLSink;
import com.example.demo.stream.sink.RedisSink;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.concurrent.TimeUnit;

/**
 * flink 读取各种来源作为输入流
 * type :1 写入csv文件
 * type :2 写入DB
 * type :3 写入redis
 */
public class DataStreamOutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 自定义测试数据源
        DataStreamSource<Event> stream = env.fromElements(
                new Event(1L, "Mary", "./home", 1000L),
                new Event(2L, "Bob", "./cart", 2000L),
                new Event(3L, "Alice", "./prod?id=100", 3000L),
                new Event(4L, "Bob", "./prod?id=1", 3300L),
                new Event(5L, "Bob", "./home", 3500L),
                new Event(6L, "Alice", "./prod?id=200", 3200L),
                new Event(7L, "Bob", "./prod?id=2", 3800L),
                new Event(8L, "Bob", "./prod?id=3", 4200L)
        );
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String type = parameterTool.get("type", "0");
        switch (type) {
            case "1":
                // 写入到csv
                StreamingFileSink<Event> streamingFileSink = StreamingFileSink.<Event>forRowFormat(new Path("D:\\java\\sink.csv"), new SimpleStringEncoder<>("UTF-8"))
                        .withRollingPolicy( // 滚动策略
                                DefaultRollingPolicy.builder()
                                        // 文件大小
                                        .withMaxPartSize(1024 * 1024 * 1024)
                                        // 时间间隔
                                        .withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
                                        // 隔五分钟数据没有来,开始准备新的文件
                                        .withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
                                        .build()
                        ).build();
                stream.addSink(streamingFileSink);
                break;
            case "2":
                // 写入到mySQL
                stream.addSink(new MySQLSink<>(EventMapper.class));
                break;
            case "3":
                // 写入redis
                stream.addSink(new RedisSink<>("event"));
                break;
            default:
                throw new RuntimeException("数据源类型传入错误！");
        }
        JobExecutionResult execute = env.execute();
        System.out.println("execute = " + execute);
    }

//    // 自定义类实现Sink接口
//    public static class RedisSink extends RichSinkFunction<Event> {
//        Jedis jedis;
//
//        /**
//         * 输出源逻辑方法
//         * @param value 流中当前的对象
//         */
//        @Override
//        public void invoke(Event value, Context context) {
//            String redisKey = "hsetTest";
//            AtomicReference<String> count = new AtomicReference<>(" ");
//            Map<String, String> stringStringMap = jedis.hgetAll(redisKey);
//            if (MapUtils.isNotEmpty(stringStringMap)) {
//                stringStringMap.forEach((key, val) -> {
//                    if (key.contains(value.getUser())) {
//                        if (key.contains("-")) {
//                            String[] split = key.split("-");
//                            count.set("-" + (Integer.parseInt(split[1]) + 1));
//                        } else {
//                            count.set("-" + "2");
//                        }
//                        jedis.hset(redisKey, value.getUser() + count, JSONUtil.toJsonStr(value));
//                    }
//                });
//            }
//            // 以hash方式写入到redis
//            jedis.hset(redisKey, value.getUser() + count, JSONUtil.toJsonStr(value));
//        }
//
//
//        @Override
//        public void open(Configuration parameters) throws Exception {
//            super.open(parameters);
//            JedisPool jedisPool = SpringContextHolder.getBean(JedisPool.class);
//            // 获取jedis连接对象
//            jedis = jedisPool.getResource();
//        }
//
//        @Override
//        public void close() throws Exception {
//            super.close();
//            if (Objects.nonNull(jedis))
//                jedis.close();
//        }
//    }

}

